Spark Operations examples

Main Spark functionality
# -*- coding: utf-8 -*-
import os

os.chdir("/home/cloudops/spark")
os.curdir

#============================
# Loading Data From Files
#============================
# Lazy initialization!!!
autoData = sc.textFile("data/auto-data.csv")
autoData.cache()

# Loads only now
autoData.count()
autoData.first()
autoData.take(5)

for line in autoData.collect():
    print(line)

#============================
# Loading Data From a Collection
#============================
collData = sc.parallelize([3,5,4,7,4])
collData.cache()
collData.count()

#============================
# Transformations
#============================

# Map (and create a new RDD)
tsvData = autoData.map(lambda x : x.replace(",","\t"))
tsvData.take(5)

# Filter (and create a new RDD)
toyotaData = autoData.filter(lambda x: "toyota" in x)
toyotaData.count()

# FlatMap (and create a new RDD)
words = toyotaData.flatMap(lambda line: line.split(","))
words.take(20)

# Distinct (collect() = execute)
for numbData in collData.distinct().collect():
    print(numbData, end = ', ') # 3, 5, 4, 7

# Set operations
words1 = sc.parallelize(["hello","war","peace","world"])
words2 = sc.parallelize(["war","peace","universe"])

# Union
for word in words1.union(words2).distinct().collect():
    print(word, end = ", ")     # peace, world, universe, hello, war,

# Intersection
for word in words1.intersection(words2).collect():
    print(word, end = ", ")     # peace, war,

#============================
# Actions
#============================

# Reduce
collData.reduce(lambda x, y: x + y)   # 23

# Find the SHORTEST LINE
autoData.reduce(lambda x,y: x if len(x) < len(y) else y)
# 'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'

# Aggregations
# =============
# Perform the same work as reduce
seqOp = (lambda x, y: (x+y))
combOp = (lambda x, y: (x+y))
collData.aggregate((0), seqOp, combOp)   # 23

# Do addition and multiplication at the same time

# X now becomes a tuple for sequence
seqOp = (lambda x, y: (x[0]+y, x[1]*y))

# both X and Y are tuples
combOp = (lambda x, y: (x[0]+y[0], x[1]*y[1]))

# initlal values: x[0] = 0, x[1] = 1
collData.aggregate((0,1), seqOp, combOp)  # (23, 1680)

#============================
# Functions in Spark
#============================

# Cleanse and transform an RDD
def cleanseRDD(autoStr) :
    if isinstance(autoStr, int) :
        return autoStr
    attList=autoStr.split(",")
    # convert doors to a number
    if attList[3] == "two" :
         attList[3]="2"
    else :
         attList[3]="4"
    # Convert Drive to uppercase
    attList[5] = attList[5].upper()
    return ",".join(attList)

cleanedData = autoData.map(cleanseRDD)
cleanedData.collect()
# ['MAKE,FUELTYPE,ASPIRE,4,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
# 'subaru,gas,std,2,hatchback,FWD,four,69,4900,31,36,5118',
# 'chevrolet,gas,std,2,hatchback,FWD,three,48,5100,47,53,5151',
# 'mazda,gas,std,2,hatchback,FWD,four,68,5000,30,31,5195',
# . . .

# Issue a function to perform reduce
def getMPG( autoStr) :
    # get the previois calculated value
    if isinstance(autoStr, int) :
        return autoStr
    # value from new line
    attList = autoStr.split(",")
    if attList[9].isdigit() :
        return int(attList[9])
    else:
        return 0

# Find average MPG-City for ALL cars
autoData.reduce(lambda x,y : getMPG(x) + getMPG(y)) \
    / (autoData.count()-1)
# 25.15228426395939

#============================
# Working with Key/Value RDDs
#============================

# Create a K:V RDD of auto Brand and Horsepower
cylData = autoData.map( lambda x: \
        ( x.split(",")[0], x.split(",")[7]))
cylData.take(3)
# [('MAKE', 'HP'), ('subaru', '69'), ('chevrolet', '48')]

cylData.keys().collect()    # repeated keys
cylData.keys().count()      # 198

# Remove header row - new RDD
header = cylData.first()
cylHPData= cylData.filter(lambda line: line != header)
# cylHPData.collect()

# Add a count 1 to each record and then reduce
# to find Totals of HP and Counts
brandValues=cylHPData.mapValues(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), \
    x[1] + y[1]))
brandValues.collect()
# [('subaru', (1035, 12)),
#  ('chevrolet', (188, 3)),
# ('mazda', (1390, 16)),
# . . .
# ('porsche', (764, 4)),
# ('mercedes-benz', (1170, 8)),
# ('jaguar', (614, 3))]

# Find Average by dividing HP total by count total
brandValues.mapValues(lambda x: int(x[0])/int(x[1])).collect()
# [('subaru', 86.25),
#  ('chevrolet', 62.666666666666664),
#  ('mazda', 86.875),
# . . .
# ('porsche', 191.0),
# ('mercedes-benz', 146.25),
# ('jaguar', 204.66666666666666)]

#============================
# Advanced Spark : Accumulators & Broadcast Variables
#============================

# Function that splits the line as well as counts sedans
# and hatchbacks

# Speed optimization
# ==================

# Initialize accumulators (global)
sedanCount = sc.accumulator(0)
hatchbackCount = sc.accumulator(0)

# Set BROADCAST variable (broadcast)
sedanText = sc.broadcast("sedan")
hatchbackText = sc.broadcast("hatchback")

def splitLines(line) :

    global sedanCount
    global hatchbackCount

    # Use broadcast variable to do comparison
    # and set accumulator
    if sedanText.value in line:
        sedanCount += 1
    if hatchbackText.value in line:
        hatchbackCount += 1

    return line.split(",")


# Do the map
splitData = autoData.map(splitLines)

# Make it execute the map (lazy execution)
splitData.count()
print(sedanCount, hatchbackCount)    # 92 67

#============================
# Advanced Spark : Partitions
#============================
collData.getNumPartitions()     # 1

# Specify no. of partitions
collData = sc.parallelize([3,5,4,7,4], 2)
collData.cache()
collData.count()              # 5

collData.getNumPartitions()   # 2

# localhost:4040 shows the current spark instance